home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
MACD 5
/
MACD 5.bin
/
workbench
/
libs
/
queue.lzh
/
queue_3.1
/
queue_library.c
< prev
next >
Wrap
C/C++ Source or Header
|
1996-12-02
|
11KB
|
478 lines
/*
queue_library.c --- queue library interface.
Copyright (c) 1995 SHW Wabnitz
Written by Bernhard Fastenrath (fasten@shw.com)
This file may be distributed under the terms
of the GNU General Public License.
*/
#if defined (__GNUC__)
#include <stabs.h>
#endif
#include "queue_library.h"
struct ExecBase *SysBase = NULL;
struct Library *QueueBase = NULL;
Semaphore QueuesSemaphore;
List Queues;
#if defined (__GNUC__)
const BYTE LibName[] = "queue.library";
const BYTE LibIdString[] = "$VER: queue.library 3.1 (12-3-96)";
const UWORD LibVersion = 3;
const UWORD LibRevision = 1;
#endif
#if defined (__GNUC__)
#define LIBRT
#define REG(regname)
#define STRUCT_MYLIB struct Library
#elif defined (__SASC)
#define LIBRT __saveds __asm
#define REG(regname) register __ ## regname
#define ADDTABL_1(name,arg1);
#define ADDTABL_2(name,arg1,arg2);
#define ADDTABL_3(name,arg1,arg2,arg3);
#define ADDTABL_END();
#define STRUCT_MYLIB struct MyLibrary
#endif
/*** configuration ***/
#ifdef SERIALIZE_WITH_FORBID
#define ObtainSemaphore(s) Forbid()
#define ReleaseSemaphore(s) Permit()
#endif
/*** internal functions ***/
#if defined (__GNUC__)
static ULONG
strlen (char *name)
{
ULONG t = 0;
while (name[t])
t++;
return t;
}
#endif
static void
SetMarker (QueueHandle *qh, QMessage *msg)
{
QueueNode *qn = qh -> qh_QNode;
if (qh -> qh_un.qhl.qhl_MinNode.mln_Succ)
Remove ((Node *) &qh -> qh_un.qhl.qhl_MinNode);
Insert (&qn -> qn_List, (Node *) &qh -> qh_un.qhl.qhl_MinNode, (Node *) msg);
}
static void
ClearMarker (QueueHandle *qh)
{
Remove ((Node *) &qh -> qh_un.qhl.qhl_MinNode);
qh -> qh_un.qhl.qhl_MinNode.mln_Succ = NULL;
}
static void
RemoveAndReply (QMessage *msg)
{
QueueHandle *qh;
if (!msg -> qm_Refs)
{
qh = (QueueHandle *) msg -> qm_Owner;
Remove ((Node *) msg);
/*** reply to owner ***/
/* ObtainSemaphore (); */
AddHead (&qh -> qh_un.qhs.qhs_ReplyList, (Node *) msg);
Signal (qh -> qh_SigTask, qh -> qh_SigMask);
/* ReleaseSemaphore (); */
}
else
msg -> qm_Status = QMS_REMOVED;
}
static void
ReplyQMessage (QueueNode *qn, QMessage *msg)
{
msg -> qm_Refs --;
msg -> qm_Replies ++;
if (msg -> qm_Replies >= qn -> qn_Read || msg -> qm_Status == QMS_REMOVED)
RemoveAndReply (msg);
}
/*** library interface ***/
int LIBRT
__UserLibInit (REG(a6) STRUCT_MYLIB *libbase)
{
SysBase = *(struct ExecBase **)4;
QueueBase = (struct Library *) libbase;
#ifndef SERIALIZE_WITH_FORBID
InitSemaphore (&QueuesSemaphore);
#endif
NewList (&Queues);
return 0; /* success */
}
void LIBRT
__UserLibCleanup (REG(a6) STRUCT_MYLIB *libbase)
{
}
ADDTABL_3(LIBQOpen,a0,d0,d1);
QHandle LIBRT
LIBQOpen (REG(a0) STRPTR name, REG(d0) ULONG mode, REG(d1) ULONG sigbit)
{
QueueHandle *qh;
QueueNode *qn;
ULONG len;
if (!(qh = (QueueHandle *) AllocMem (sizeof (QueueHandle), MEMF_PUBLIC | MEMF_CLEAR)))
return NULL;
ObtainSemaphore (&QueuesSemaphore);
if (!(qn = (QueueNode *) FindName (&Queues, name)))
{
if (!(qn = AllocMem (sizeof (QueueNode), MEMF_PUBLIC | MEMF_CLEAR)))
{
ReleaseSemaphore (&QueuesSemaphore);
FreeMem (qh, sizeof (QueueHandle));
return NULL;
}
#ifndef SERIALIZE_WITH_FORBID
InitSemaphore (&qn -> qn_Semaphore);
#endif
ObtainSemaphore (&qn -> qn_Semaphore);
len = strlen (name) + 1;
if (!(qn -> qn_Node.ln_Name = AllocMem (len, MEMF_PUBLIC)))
{
ReleaseSemaphore (&qn -> qn_Semaphore);
ReleaseSemaphore (&QueuesSemaphore);
FreeMem (qh, sizeof (QueueHandle));
FreeMem (qn, sizeof (QueueNode));
return NULL;
}
CopyMem (name, qn -> qn_Node.ln_Name, len);
AddHead (&Queues, (Node *) qn);
NewList (&qn -> qn_Handles);
NewList (&qn -> qn_List);
qn -> qn_Refs = 1;
}
else
{
ObtainSemaphore (&qn -> qn_Semaphore);
qn -> qn_Refs ++;
}
AddHead (&qn -> qn_Handles, (Node *) qh);
qh -> qh_Mode = mode;
qh -> qh_QNode = qn;
qh -> qh_SigMask = 1 << sigbit;
qh -> qh_SigTask = FindTask (0);
if (mode == QMODE_LISTEN)
{
qn -> qn_Read ++;
qh -> qh_un.qhl.qhl_Status = QMS_MARKER;
Signal (qh -> qh_SigTask, qh -> qh_SigMask);
}
else
{
NewList (&qh -> qh_un.qhs.qhs_ReplyList);
}
ReleaseSemaphore (&qn -> qn_Semaphore);
ReleaseSemaphore (&QueuesSemaphore);
return (QHandle) qh;
}
ADDTABL_1(LIBQClose,a0);
ULONG LIBRT
LIBQClose (REG(a0) QHandle qhandle)
{
QueueNode *qn = ((QueueHandle *) qhandle) -> qh_QNode;
QueueHandle *qh = qhandle;
QMessage *cmsg, *msg, *next;
ULONG count;
/* to avoid a possible deadlock with QOpen() the QueuesSemaphore
has to be locked before the QNode's semaphore here.
*/
ObtainSemaphore (&QueuesSemaphore);
ObtainSemaphore (&qn -> qn_Semaphore);
if (qh -> qh_Mode == QMODE_SEND)
{
if (count = qh -> qh_un.qhs.qhs_MsgCount)
{
ReleaseSemaphore (&QueuesSemaphore);
ReleaseSemaphore (&qn -> qn_Semaphore);
return count;
}
}
else
{
if (cmsg = (QMessage *) qh -> qh_un.qhl.qhl_MinNode.mln_Succ)
{
ClearMarker (qh);
next = (QMessage *) cmsg -> qm_MinNode.mln_Pred;
}
else if (cmsg = qh -> qh_un.qhl.qhl_Message)
{
cmsg -> qm_Refs --;
cmsg -> qm_Replies ++;
next = cmsg;
}
if (next)
{
for (msg = next; next = (QMessage *) msg -> qm_MinNode.mln_Pred;
msg = next)
{
/* Replies from the current task
don't count after QClose().
*/
if (msg -> qm_Status == QMS_ACTIVE || msg -> qm_Status == QMS_REMOVED)
msg -> qm_Replies --;
}
}
else
cmsg = (QMessage *) qn -> qn_List.lh_Head;
for (msg = cmsg; next = (QMessage *) msg -> qm_MinNode.mln_Succ; msg = next)
{
/* These messages might be in the queue, waiting only
for the current task; let's give them a chance.
*/
if ((msg -> qm_Status == QMS_ACTIVE && msg -> qm_Replies >= qn -> qn_Read)
|| msg -> qm_Status == QMS_REMOVED)
{
RemoveAndReply (msg);
}
if (msg -> qm_Status == QMS_MARKER)
break;
}
qn -> qn_Read --;
}
Remove ((Node *) qh);
FreeMem (qh, sizeof (QueueHandle));
if (! -- qn -> qn_Refs)
{
ReleaseSemaphore (&qn -> qn_Semaphore);
Remove ((Node *) qn);
FreeMem (qn -> qn_Node.ln_Name, strlen (qn -> qn_Node.ln_Name) + 1);
FreeMem (qn, sizeof (QueueNode));
}
else
ReleaseSemaphore (&qn -> qn_Semaphore);
ReleaseSemaphore (&QueuesSemaphore);
return 0;
}
ADDTABL_2(LIBQAddMsg,a0,a1);
void LIBRT
LIBQAddMsg (REG(a0) QHandle qhandle, REG(a1) QMessage *msg)
{
QueueNode *qn = ((QueueHandle *) qhandle) -> qh_QNode;
QueueHandle *qh = (QueueHandle *) qhandle;
msg -> qm_Owner = qhandle;
msg -> qm_Refs = 0;
msg -> qm_Replies = 0;
msg -> qm_Status = QMS_ACTIVE;
ObtainSemaphore (&qn -> qn_Semaphore);
AddTail (&qn -> qn_List, (Node *) msg);
qh -> qh_un.qhs.qhs_MsgCount ++;
if (qn -> qn_Read)
{
for (qh = (QueueHandle *) qn -> qn_Handles.lh_Head; qh -> qh_MinNode.mln_Succ;
qh = (QueueHandle *) qh -> qh_MinNode.mln_Succ)
{
if (qh -> qh_Mode == QMODE_LISTEN)
Signal (qh -> qh_SigTask, qh -> qh_SigMask);
}
}
else
RemoveAndReply (msg);
ReleaseSemaphore (&qn -> qn_Semaphore);
}
ADDTABL_2(LIBQRemMsg,a0,a1);
void LIBRT
LIBQRemMsg (REG(a0) QHandle qhandle, REG(a1) QMessage *msg)
{
QueueNode *qn = ((QueueHandle *) qhandle) -> qh_QNode;
ObtainSemaphore (&qn -> qn_Semaphore);
RemoveAndReply (msg);
ReleaseSemaphore (&qn -> qn_Semaphore);
}
ADDTABL_1(LIBQGetMsg,a0);
QMessage * LIBRT
LIBQGetMsg (REG(a0) QHandle qhandle)
{
QueueNode *qn = ((QueueHandle *) qhandle) -> qh_QNode;
QueueHandle *qh = (QueueHandle *) qhandle;
QMessage *msg, *next;
ObtainSemaphore (&qn -> qn_Semaphore);
/* A server ( QMODE_SEND ) retrieves a replied message */
if (qh -> qh_Mode == QMODE_SEND)
{
if (msg = (QMessage *) RemTail (&qh -> qh_un.qhs.qhs_ReplyList))
qh -> qh_un.qhs.qhs_MsgCount --;
ReleaseSemaphore (&qn -> qn_Semaphore);
return msg;
}
/* A client ( QMODE_LISTEN ) reads a message */
if (msg = qh -> qh_un.qhl.qhl_Message)
{
/* automagically reply the current message */
next = (QMessage *) msg -> qm_MinNode.mln_Succ;
ReplyQMessage (qn, msg);
msg = next;
}
else /* no current message, start at marker or list head */
{
if (msg = (QMessage *) qh -> qh_un.qhl.qhl_MinNode.mln_Succ)
{
ClearMarker (qh);
}
else
msg = (QMessage *) qn -> qn_List.lh_Head;
}
for (;;) /* find the next message */
{
if (!(next = (QMessage *) msg -> qm_MinNode.mln_Succ))
{
SetMarker (qh, (QMessage *) msg -> qm_MinNode.mln_Pred);
msg = NULL; /* no message available */
break;
}
if (msg -> qm_Status & QMS_INACTIVE)
{
if (msg -> qm_Status == QMS_REMOVED)
RemoveAndReply (msg);
msg = next;
continue;
}
msg -> qm_Refs ++;
break;
}
qh -> qh_un.qhl.qhl_Message = msg;
ReleaseSemaphore (&qn -> qn_Semaphore);
return msg;
}
ADDTABL_1(LIBQReplyMsg,a0);
ULONG LIBRT
LIBQReplyMsg (REG(a0) QHandle qhandle)
{
QueueNode *qn = ((QueueHandle *) qhandle) -> qh_QNode;
QueueHandle *qh = (QueueHandle *) qhandle;
QMessage *msg, *next;
if (!(msg = qh -> qh_un.qhl.qhl_Message))
return 0;
ObtainSemaphore (&qn -> qn_Semaphore);
SetMarker (qh, msg);
next = (QMessage *) msg -> qm_MinNode.mln_Succ;
ReplyQMessage (qn, msg);
if (next)
{
for (msg = next; next = (QMessage *) msg -> qm_MinNode.mln_Succ; msg = next)
{
if (msg -> qm_Status == QMS_ACTIVE)
{
/* Since QReplyMsg() means you're not going to read more messages now
it's probably a good idea to remind you that there's more.
*/
Signal (qh -> qh_SigTask, qh -> qh_SigMask);
break;
}
}
}
qh -> qh_un.qhl.qhl_Message = NULL;
ReleaseSemaphore (&qn -> qn_Semaphore);
return 1;
}
ADDTABL_1(LIBQFlush,a0);
ULONG LIBRT
LIBQFlush (REG(a0) QHandle qhandle)
{
QueueNode *qn = ((QueueHandle *) qhandle) -> qh_QNode;
QMessage *msg, *next;
ObtainSemaphore (&qn -> qn_Semaphore);
msg = (QMessage *) qn -> qn_List.lh_Head;
while (next = (QMessage *) msg -> qm_MinNode.mln_Succ)
{
if (msg -> qm_Status != QMS_MARKER)
RemoveAndReply (msg);
msg = next;
}
ReleaseSemaphore (&qn -> qn_Semaphore);
return 1;
}
/* new in 3.0 */
ADDTABL_1(LIBQAllocMsg,d0);
QMessage * LIBRT
LIBQAllocMsg (REG(d0) ULONG size)
{
QMessage *msg;
if (!(msg = AllocMem (sizeof (QMessage), MEMF_PUBLIC | MEMF_CLEAR)))
return NULL;
if (size)
{
if (!(msg -> qm_Data = AllocMem (size, MEMF_SHARED_READ)))
{
FreeMem (msg, sizeof (QMessage));
return NULL;
}
}
return msg;
}
ADDTABL_2(LIBQFreeMsg,a0,d0);
void LIBRT
LIBQFreeMsg (REG(a0) QMessage *msg, REG(d0) ULONG size)
{
if (msg -> qm_Data)
FreeMem (msg -> qm_Data, size);
FreeMem (msg, sizeof (QMessage));
}
ADDTABL_END();